#include <cstddef>
#include <cstdint>
#include <cstring>
#include <cstdlib>
#include <cmath>
#include <atomic>
#include <functional>
#include <memory>
#include <utility>
#include <algorithm>
#include <tuple>
#include <string>
#include <random>
#include <chrono>
#include <stdexcept>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <locale>
#include <sstream>
#include <iostream>
#include <iomanip>

#ifndef _WIN32
#include <unistd.h>
#include <sys/wait.h>
#endif

#include <realm/util/terminate.hpp>
#include <realm/util/demangle.hpp>
#include <realm/util/enum.hpp>
#include <realm/util/time.hpp>
#include <realm/util/string_view.hpp>
#include <realm/util/buffer.hpp>
#include <realm/util/random.hpp>
#include <realm/util/thread.hpp>
#include <realm/util/network.hpp>
#include <realm/util/uri.hpp>
#include <realm/util/logger.hpp>
#include <realm/util/load_file.hpp>
#include <realm/util/resource_limits.hpp>
#include <realm/util/thread_exec_guard.hpp>
#include <realm/util/timestamp_logger.hpp>
#include <realm/util/substitute.hpp>
#include <realm/util/base64.hpp>
#include <realm/util/json_parser.hpp>
#include <realm/sync/impl/clock.hpp>
#include <realm/version.hpp>
#include <realm/sync/protocol.hpp>
#include <realm/sync/client.hpp>

#include "peer.hpp"

using namespace realm;
using namespace realm::test_client;

using milliseconds_type = sync::milliseconds_type;


namespace {

// The Base64-encoded user token is generated by the following command:
//     cat test_token.json | base64
// The Base64-encoded signature is generated by the following command:
//     cat test_token.json | openssl dgst -sha256 -binary -sign test.pem | base64
// The two are concatenated with a ':'.
const char g_signed_test_access_token[] =
    "ewogICAgImlkZW50aXR5IjogInRlc3QiLAogICAgImFjY2VzcyI6IFsiZG93bmxvYWQiLCAidXBs"
    "b2FkIl0sCiAgICAidGltZXN0YW1wIjogMTQ1NTUzMDYxNCwKICAgICJleHBpcmVzIjogbnVsbCwK"
    "ICAgICJhcHBfaWQiOiAiaW8ucmVhbG0uVGVzdCIKfQo="
    ":"
    "kPQwXUUFFVoDkmw02ouA1g7OlXcZ/IJPpqwJs9lIi1azpyuakBWgQ8VhnInCXh90CQXYhnteZlMw"
    "HYUZgt3/ED1jLu+nK2HlRKsmsOuAI20jMnHGGIZkql4/Ck9PEsvZ3huHGk5Jv9vpFp/dtnl1JXK2"
    "9XjdO8+1hU4boeJuKpTMDTPwGI9dxa8sTtvMMN7AVoPkKb1uqHZVsb5uRGE86Cyv58cvuj/EvZ1A"
    "yOCt5NGJwjTxydPgfX3QPcNMwDTHCRWYuoi2oTCINQHy8ebzXVLT1iy3adV4rM5bJukCnpLqHGlZ"
    "MIslk07zKdoj3igMIT47W9QwIuCw8x5f5cRIAg==";


enum class AuthMethod { none, anon, user };
enum class AbortOnError { fatal, always, never };
enum class PropagationTimeThreshold { start, reconnect };

struct AuthMethodSpec {
    static util::EnumAssoc map[];
};
util::EnumAssoc AuthMethodSpec::map[] = {
    {int(AuthMethod::none), "none"}, {int(AuthMethod::anon), "anon"}, {int(AuthMethod::user), "user"}, {0, nullptr}};
using AuthMethodEnum = util::Enum<AuthMethod, AuthMethodSpec>;

struct AbortOnErrorSpec {
    static util::EnumAssoc map[];
};
util::EnumAssoc AbortOnErrorSpec::map[] = {{int(AbortOnError::fatal), "fatal"},
                                           {int(AbortOnError::always), "always"},
                                           {int(AbortOnError::never), "never"},
                                           {0, nullptr}};
using AbortOnErrorEnum = util::Enum<AbortOnError, AbortOnErrorSpec>;

struct PropagationTimeThresholdSpec {
    static util::EnumAssoc map[];
};
util::EnumAssoc PropagationTimeThresholdSpec::map[] = {{int(PropagationTimeThreshold::start), "start"},
                                                       {int(PropagationTimeThreshold::reconnect), "reconnect"},
                                                       {0, nullptr}};
using PropagationTimeThresholdEnum = util::Enum<PropagationTimeThreshold, PropagationTimeThresholdSpec>;


struct PhaseSpec {
    int num_transacts = 0;
    milliseconds_type transact_period = 1000;  // Time in milliseconds between transactions
    milliseconds_type max_transact_period = 0; // Max time in milliseconds between transactions
    int num_blobs = 1;
    std::size_t blob_size = 0;
    std::string blob_label;
    int blob_kind = 0;
    int blob_level = 0;
    int max_blob_level = 0;
    bool replace_blobs = false;
    bool send_ptime_requests = false;
};


template <class T>
std::uniform_int_distribution<T> make_distr(T min, T max)
{
    return std::uniform_int_distribution<T>{min, std::max(min, max)};
}


class ThresholdOverrideLogger : public util::RootLogger {
public:
    ThresholdOverrideLogger(util::Logger& base_logger)
        : util::RootLogger{}
        , m_base_logger{base_logger}
    {
    }

    void do_log(Logger::Level level, std::string message) override final
    {
        util::Logger::do_log(m_base_logger, level, message);
    }

private:
    util::Logger& m_base_logger;
};


class MainEventLoop {
public:
    MainEventLoop(sync::Client&, bool interactive);

    void run();
    void stop() noexcept;
    void end_of_test_proc() noexcept;

private:
    sync::Client& m_client;
    const bool m_interactive;

    util::Mutex m_mutex;
    util::CondVar m_cond;

    bool m_stop = false;             // Protected by `m_mutex`
    bool m_end_of_test_proc = false; // Protected by `m_mutex`
};

inline MainEventLoop::MainEventLoop(sync::Client& client, bool interactive)
    : m_client{client}
    , m_interactive{interactive}
{
    if (m_interactive)
        std::cout << "Type `help` to get a list of available commands.\n";
}

void MainEventLoop::run()
{
    if (m_interactive) {
        std::string command;
        while (std::getline(std::cin, command)) {
            {
                util::LockGuard lock{m_mutex};
                if (REALM_UNLIKELY(m_stop))
                    return;
            }
            if (command == "help") {
                std::cout << "Available commands:\n"
                             "  help       Show available commands.\n"
                             "  quit       Quit interactive mode.\n"
                             "  reconnect  Cancel reconnect delay.\n";
            }
            else if (command == "quit") {
                break;
            }
            else if (command == "reconnect") {
                std::cout << "Canceling reconnect delay\n";
                m_client.cancel_reconnect_delay();
            }
        }
        std::cout << "Quitting interactive mode\n";
    }
    util::LockGuard lock{m_mutex};
    for (;;) {
        if (REALM_UNLIKELY(m_stop || m_end_of_test_proc))
            return;
        m_cond.wait(lock);
    }
}

inline void MainEventLoop::stop() noexcept
{
    util::LockGuard lock{m_mutex};
    m_stop = true;
    m_cond.notify_all();
}

inline void MainEventLoop::end_of_test_proc() noexcept
{
    util::LockGuard lock{m_mutex};
    m_end_of_test_proc = true;
    m_cond.notify_all();
}


struct PeerControl {
    PeerControl(util::network::Service&);
    int phase_ndx = 0;
    int transact_ndx = 0;
    int download_wait_ndx = 0;
    util::network::DeadlineTimer transact_timer;
};

inline PeerControl::PeerControl(util::network::Service& service)
    : transact_timer{service}
{
}


struct PeerControlFuncs {
    std::function<void(int)> initiate_bind;
    std::function<void(int, std::error_code ec, std::string, std::string)> logged_in;
    std::function<void(int)> on_bound;
    std::function<void(int)> begin_schedule;
    std::function<void(int)> launch_phase;
    std::function<void(int)> sched_perform_transaction;
    std::function<void(int)> perform_transaction;
    std::function<void(int)> check_end_of_schedule;
    std::function<void(int)> on_end_of_schedule;
    std::function<void(int)> initiate_wait_for_upload_completion;
    std::function<void(int)> on_upload_completion;
    std::function<void()> on_upload_completion_for_all;
    std::function<void(int)> initiate_wait_for_download_completion;
    std::function<void(int)> on_download_completion;
    std::function<void()> on_download_completion_for_all;
};

} // unnamed namespace


int main(int argc, char* argv[])
{
    std::string realm_path;
    std::string server_url;
    bool have_server_url = false;
    std::string app_id;
    std::string realm_name = "\"\""; // xjson
    bool app_id_specified = false;
    util::Logger::Level log_level = util::Logger::Level::info;
    util::Logger::Level sync_log_level = util::Logger::Level::warn;
    AbortOnErrorEnum abort_on_error = AbortOnError::fatal;
    bool log_timestamps = false;
    AuthMethodEnum auth_method = AuthMethod::none;
    std::string access_token;
    std::string access_token_path;
    std::string username, password;
    std::string request_base_path = "/api/client/v2.0";
    std::vector<PhaseSpec> phases = {{}};
    int total_num_transacts = 0;
    bool receive_ptime_requests = false;
    PropagationTimeThresholdEnum ptime_request_threshold = PropagationTimeThreshold::start;
    std::int_fast64_t originator_ident = 0;
    bool ensure_ptime_class = false;
    bool ensure_blob_class = false;
    bool download_first = false;
    // Client reset options are also used for async open.
    std::string client_reset_metadata_dir = "";
    bool disable_client_reset_recover_local_changes = false;
    bool disable_client_reset_require_recent_state_realm = false;
    int num_download_waits = 1;
    bool follow = false;
    bool interactive = false;
    int num_peers = 1;
    int num_growths = 1;                           // Number of growth steps
    milliseconds_type time_between_growths = 1000; // Time in milliseconds between growth steps
    std::string query_class = "Queryable";
    std::vector<std::string> ensure_query_classes;
    int queryable_level = 0;
    int max_queryable_level = 0;
    std::string queryable_text = "Foo";
    // class_name, num_rows, queryable_level, max_queryable_level, queryable_text
    std::vector<std::tuple<std::string, int, int, int, std::string>> generate_queryables;
    std::vector<std::pair<std::string, std::string>> add_queries;
    bool dump_result_sets = false;
    milliseconds_type start_delay = 0;     // Time in milliseconds to delay start of test process
    milliseconds_type max_start_delay = 0; // Max time in milliseconds to delay start of test process
    bool connection_per_session = false;
    bool disable_sync_to_disk = false;
    bool dry_run = false;
    milliseconds_type time_between_pings = 60000;     //  1 minute
    milliseconds_type pong_timeout = 120000;          //  2 minutes
    milliseconds_type connect_timeout = 120000;       //  2 minutes
    milliseconds_type connection_linger_time = 30000; // 30 seconds
    bool tcp_no_delay = false;
    bool verify_ssl_cert = false;
    std::string ssl_trust_cert = "";
    std::string metrics_prefix = "realm";
    std::string statsd_address = "localhost";
    int statsd_port = 8125;
    bool report_roundtrip_times = false;
    bool halt_on_crash = false;
    bool allow_core_dump = false;
    bool disable_upload_compaction = false;
    bool use_trivial_cooker = false;

    // Process command line
    {
        const char* prog = argv[0];
        --argc;
        ++argv;
        bool error = false;
        bool help = false;
        int argc_2 = 0;
        int i = 0;
        char* arg = nullptr;
        auto get_string_value = [&](std::string& var) {
            if (i + 1 < argc) {
                var = argv[++i];
                return true;
            }
            return false;
        };
        auto get_parsed_value_with_check = [&](auto& var, auto check_val) {
            std::string str_val;
            if (get_string_value(str_val)) {
                std::istringstream in(str_val);
                in.imbue(std::locale::classic());
                in.unsetf(std::ios_base::skipws);
                using value_type = typename std::remove_reference<decltype(var)>::type;
                value_type val = value_type{};
                in >> val;
                if (in && in.eof() && check_val(val)) {
                    var = val;
                    return true;
                }
            }
            return false;
        };
        auto get_parsed_value = [&](auto& var) {
            return get_parsed_value_with_check(var, [](auto) {
                return true;
            });
        };
        for (; i < argc; ++i) {
            arg = argv[i];
            if (arg[0] != '-') {
                argv[argc_2++] = arg;
                continue;
            }
            if (std::strcmp(arg, "-h") == 0 || std::strcmp(arg, "--help") == 0) {
                help = true;
                continue;
            }
            else if (std::strcmp(arg, "--app-id") == 0) {
                if (get_string_value(app_id)) {
                    app_id_specified = true;
                    continue;
                }
            }
            else if (std::strcmp(arg, "--realm-name") == 0) {
                if (get_string_value(realm_name)) {
                    app_id_specified = true;
                    continue;
                }
            }
            else if (std::strcmp(arg, "-l") == 0 || std::strcmp(arg, "--log-level") == 0) {
                if (get_parsed_value(log_level))
                    continue;
            }
            else if (std::strcmp(arg, "-k") == 0 || std::strcmp(arg, "--sync-log-level") == 0) {
                if (get_parsed_value(sync_log_level))
                    continue;
            }
            else if (std::strcmp(arg, "-E") == 0 || std::strcmp(arg, "--abort-on-error") == 0) {
                if (get_parsed_value(abort_on_error))
                    continue;
            }
            else if (std::strcmp(arg, "-K") == 0 || std::strcmp(arg, "--log-timestamps") == 0) {
                log_timestamps = true;
                continue;
            }
            else if (std::strcmp(arg, "--auth-method") == 0) {
                if (get_parsed_value(auth_method))
                    continue;
            }
            else if (std::strcmp(arg, "-A") == 0 || std::strcmp(arg, "--access-token") == 0) {
                if (get_string_value(access_token))
                    continue;
            }
            else if (std::strcmp(arg, "-P") == 0 || std::strcmp(arg, "--access-token-path") == 0) {
                if (get_string_value(access_token_path))
                    continue;
            }
            else if (std::strcmp(arg, "-u") == 0 || std::strcmp(arg, "--username") == 0) {
                if (get_string_value(username))
                    continue;
            }
            else if (std::strcmp(arg, "-p") == 0 || std::strcmp(arg, "--password") == 0) {
                if (get_string_value(password))
                    continue;
            }
            else if (std::strcmp(arg, "-request-base-path") == 0) {
                if (get_string_value(request_base_path))
                    continue;
            }
            else if (std::strcmp(arg, "-n") == 0 || std::strcmp(arg, "--num-transacts") == 0) {
                auto& var = phases.back().num_transacts;
                if (get_parsed_value_with_check(var, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-t") == 0 || std::strcmp(arg, "--transact-period") == 0) {
                auto& var = phases.back().transact_period;
                if (get_parsed_value_with_check(var, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-m") == 0 || std::strcmp(arg, "--max-transact-period") == 0) {
                auto& var = phases.back().max_transact_period;
                if (get_parsed_value_with_check(var, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-B") == 0 || std::strcmp(arg, "--num-blobs") == 0) {
                auto& var = phases.back().num_blobs;
                if (get_parsed_value_with_check(var, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-s") == 0 || std::strcmp(arg, "--blob-size") == 0) {
                auto& var = phases.back().blob_size;
                if (get_parsed_value(var))
                    continue;
            }
            else if (std::strcmp(arg, "--blob-label") == 0) {
                auto& var = phases.back().blob_label;
                if (get_string_value(var))
                    continue;
            }
            else if (std::strcmp(arg, "--blob-kind") == 0) {
                auto& var = phases.back().blob_kind;
                if (get_parsed_value(var))
                    continue;
            }
            else if (std::strcmp(arg, "-z") == 0 || std::strcmp(arg, "--blob-level") == 0) {
                auto& var = phases.back().blob_level;
                if (get_parsed_value(var))
                    continue;
            }
            else if (std::strcmp(arg, "-Z") == 0 || std::strcmp(arg, "--max-blob-level") == 0) {
                auto& var = phases.back().max_blob_level;
                if (get_parsed_value(var))
                    continue;
            }
            else if (std::strcmp(arg, "-a") == 0 || std::strcmp(arg, "--replace-blobs") == 0) {
                auto& var = phases.back().replace_blobs;
                var = true;
                continue;
            }
            else if (std::strcmp(arg, "--next-phase") == 0) {
                PhaseSpec init = phases.back();
                phases.push_back(std::move(init));
                continue;
            }
            else if (std::strcmp(arg, "-S") == 0 || std::strcmp(arg, "--send-ptime-requests") == 0) {
                auto& var = phases.back().send_ptime_requests;
                var = true;
                continue;
            }
            else if (std::strcmp(arg, "-R") == 0 || std::strcmp(arg, "--receive-ptime-requests") == 0) {
                receive_ptime_requests = true;
                continue;
            }
            else if (std::strcmp(arg, "-F") == 0 || std::strcmp(arg, "--ptime-request-threshold") == 0) {
                if (get_parsed_value(ptime_request_threshold))
                    continue;
            }
            else if (std::strcmp(arg, "-X") == 0 || std::strcmp(arg, "--originator-ident") == 0) {
                if (get_parsed_value(originator_ident))
                    continue;
            }
            else if (std::strcmp(arg, "-y") == 0 || std::strcmp(arg, "--ensure-ptime-class") == 0) {
                ensure_ptime_class = true;
                continue;
            }
            else if (std::strcmp(arg, "-Y") == 0 || std::strcmp(arg, "--ensure-blob-class") == 0) {
                ensure_blob_class = true;
                continue;
            }
            else if (std::strcmp(arg, "-w") == 0 || std::strcmp(arg, "--download-first") == 0) {
                download_first = true;
                continue;
            }
            else if (std::strcmp(arg, "--client-reset-metadata-dir") == 0) {
                if (get_string_value(client_reset_metadata_dir))
                    continue;
            }
            else if (std::strcmp(arg, "--disable-client-reset-recover-local-changes") == 0) {
                disable_client_reset_recover_local_changes = true;
                continue;
            }
            else if (std::strcmp(arg, "--disable-client-reset-require-recent-state-realm") == 0) {
                disable_client_reset_require_recent_state_realm = true;
                continue;
            }
            else if (std::strcmp(arg, "-W") == 0 || std::strcmp(arg, "--num-download-waits") == 0) {
                if (get_parsed_value_with_check(num_download_waits, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-i") == 0 || std::strcmp(arg, "--interactive") == 0) {
                interactive = true;
                continue;
            }
            else if (std::strcmp(arg, "-f") == 0 || std::strcmp(arg, "--follow") == 0) {
                follow = true;
                continue;
            }
            else if (std::strcmp(arg, "-N") == 0 || std::strcmp(arg, "--num-peers") == 0) {
                if (get_parsed_value_with_check(num_peers, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-g") == 0 || std::strcmp(arg, "--num-growths") == 0) {
                if (get_parsed_value_with_check(num_growths, [](auto v) {
                        return v >= 1;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-G") == 0 || std::strcmp(arg, "--time-between-growths") == 0) {
                if (get_parsed_value_with_check(time_between_growths, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-C") == 0 || std::strcmp(arg, "--query-class") == 0) {
                if (get_string_value(query_class))
                    continue;
            }
            else if (std::strcmp(arg, "-e") == 0 || std::strcmp(arg, "--ensure-query-class") == 0) {
                ensure_query_classes.emplace_back(query_class);
                continue;
            }
            else if (std::strcmp(arg, "-j") == 0 || std::strcmp(arg, "--queryable-level") == 0) {
                if (get_parsed_value(queryable_level))
                    continue;
            }
            else if (std::strcmp(arg, "-J") == 0 || std::strcmp(arg, "--max-queryable-level") == 0) {
                if (get_parsed_value(max_queryable_level))
                    continue;
            }
            else if (std::strcmp(arg, "-b") == 0 || std::strcmp(arg, "--queryable-text") == 0) {
                if (get_string_value(queryable_text))
                    continue;
            }
            else if (std::strcmp(arg, "-Q") == 0 || std::strcmp(arg, "--generate-queryable") == 0) {
                int n = 0;
                if (get_parsed_value(n)) {
                    generate_queryables.emplace_back(query_class, n, queryable_level, max_queryable_level,
                                                     queryable_text);
                    continue;
                }
            }
            else if (std::strcmp(arg, "-q") == 0 || std::strcmp(arg, "--add-query") == 0) {
                std::string query;
                if (get_string_value(query)) {
                    add_queries.emplace_back(query_class, query);
                    continue;
                }
            }
            else if (std::strcmp(arg, "-r") == 0 || std::strcmp(arg, "--dump-result-sets") == 0) {
                dump_result_sets = true;
                continue;
            }
            else if (std::strcmp(arg, "--start-delay") == 0) {
                if (get_parsed_value_with_check(start_delay, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "--max-start-delay") == 0) {
                if (get_parsed_value_with_check(max_start_delay, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-c") == 0 || std::strcmp(arg, "--connection-per-session") == 0) {
                connection_per_session = true;
                continue;
            }
            else if (std::strcmp(arg, "-d") == 0 || std::strcmp(arg, "--disable-sync-to-disk") == 0) {
                disable_sync_to_disk = true;
                continue;
            }
            else if (std::strcmp(arg, "-D") == 0 || std::strcmp(arg, "--dry-run") == 0) {
                dry_run = true;
                continue;
            }
            else if (std::strcmp(arg, "-I") == 0 || std::strcmp(arg, "--time-between-pings") == 0) {
                if (get_parsed_value_with_check(time_between_pings, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-O") == 0 || std::strcmp(arg, "--pong-timeout") == 0) {
                if (get_parsed_value_with_check(pong_timeout, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-U") == 0 || std::strcmp(arg, "--connect-timeout") == 0) {
                if (get_parsed_value_with_check(connect_timeout, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-L") == 0 || std::strcmp(arg, "--connection-linger-time") == 0) {
                if (get_parsed_value_with_check(connection_linger_time, [](auto v) {
                        return v >= 0;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-o") == 0 || std::strcmp(arg, "--tcp-no-delay") == 0) {
                tcp_no_delay = true;
                continue;
            }
            else if (std::strcmp(arg, "-V") == 0 || std::strcmp(arg, "--verify-ssl-cert") == 0) {
                verify_ssl_cert = true;
                continue;
            }
            else if (std::strcmp(arg, "-T") == 0 || std::strcmp(arg, "--ssl-trust-cert") == 0) {
                if (get_string_value(ssl_trust_cert))
                    continue;
            }
            else if (std::strcmp(arg, "-M") == 0 || std::strcmp(arg, "--metrics-prefix") == 0) {
                if (get_string_value(metrics_prefix))
                    continue;
            }
            else if (std::strcmp(arg, "--statsd-address") == 0) {
                if (get_string_value(statsd_address))
                    continue;
            }
            else if (std::strcmp(arg, "--statsd-port") == 0) {
                if (get_parsed_value_with_check(statsd_port, [](auto v) {
                        return v <= 0xFFFF;
                    }))
                    continue;
            }
            else if (std::strcmp(arg, "-v") == 0 || std::strcmp(arg, "--report-roundtrip-times") == 0) {
                report_roundtrip_times = true;
                continue;
            }
            else if (std::strcmp(arg, "-H") == 0 || std::strcmp(arg, "--halt-on-crash") == 0) {
                halt_on_crash = true;
                continue;
            }
            else if (std::strcmp(arg, "-x") == 0 || std::strcmp(arg, "--allow-core-dump") == 0) {
                allow_core_dump = true;
                continue;
            }
            else if (std::strcmp(arg, "--disable-upload-compaction") == 0) {
                disable_upload_compaction = true;
                continue;
            }
            else if (std::strcmp(arg, "--use-trivial-cooker") == 0) {
                use_trivial_cooker = true;
                continue;
            }
            else {
                std::cerr << "ERROR: Unknown option: " << arg << "\n";
                error = true;
                continue;
            }
            std::cerr << "ERROR: Bad or missing value for option: " << arg << "\n";
            error = true;
        }
        argc = argc_2;

        if (argc < 1 || argc > 2)
            error = true;

        bool send_ptime_requests = false;
        for (const PhaseSpec& phase : phases) {
            total_num_transacts += phase.num_transacts;
            if (phase.num_transacts > 0 && phase.send_ptime_requests)
                send_ptime_requests = true;
        }
        bool bad_combinations = ((total_num_transacts > 0 && dry_run) || (receive_ptime_requests && dry_run) ||
                                 (send_ptime_requests && receive_ptime_requests));
        if (bad_combinations)
            error = true;

        if (auth_method != AuthMethod::none && !app_id_specified) {
            std::cerr << "ERROR: Need Stitch application identifier (`--app-id`)\n";
            error = true;
        }

        if (help) {
            std::cerr << "Synopsis: " << prog
                      << "  PATH  [URL]\n"
                         "\n"
                         "PATH is the local file-system path of a client-side Realm file, and URL\n"
                         "specifies the server address and protocol envelope. If the URL is not\n"
                         "specified, no synchronization is done.\n"
                         "\n"
                         "Certain substitution parameters, if found in PATH, URL, Realm identifier\n"
                         "(`--realm-name`), username (`--username`), or subscription queries\n"
                         "(`--add-query`), will be replaced as follows:\n"
                         "  @N    The numer of the curresponding peer (one-based) with leading zeroes.\n"
                         "  @I    The index of the curresponding peer (zero-based) without leading zeroes.\n"
                         "  @H    The host name as returned by the `hostname` command.\n"
                         "  @@    The character `@`.\n"
                         "Substitutions can either be specified on short form, `@X` (only when length of\n"
                         "name is 1), or on long form, `@{X}`.\n"
                         "\n"
                         "Options:\n"
                         "  -h, --help           Display command-line synopsis followed by the list of\n"
                         "                       available options.\n"
                         "  --app-id             The Stitch application identifier (e.g. `foo-rfhxk`).\n"
                         "                       This will be used to construct HTTP request paths. See\n"
                         "                       also `--request-base-path`.\n"
                         "  --realm-name         A string on xjson format that identifies a particular\n"
                         "                       Realm on the server. The default value is \"\", i.e.,\n"
                         "                       the empty string expressed in xjson format.\n"
                         "  -l, --log-level      Set the log level of the testing process. Valid values\n"
                         "                       are `all`, `trace`, `debug`, `detail`, `info`, `warn`,\n"
                         "                       `error`, `fatal`, and `off`. It is `info` by default.\n"
                         "  -k, --sync-log-level  Set the log level of the synchronization process. Valid\n"
                         "                       values are the same as for `--log-level`. It is `warn`\n"
                         "                       by default.\n"
                         "  -E, --abort-on-error  Specify when to abort on a synchronization error. Valid\n"
                         "                       values are `fatal`, `always`, and `never`. It is `fatal`\n"
                         "                       by default.\n"
                         "  -K, --log-timestamps  Include timestamps in the log output.\n"
                         "  --auth-method        The authentication method. Can be `none`, `anon`, or\n"
                         "                       `user`. Is `none` by default. When `none`, no access\n"
                         "                       token is obtained. Instead, the access token can be\n"
                         "                       specified directly using `--access-token` or\n"
                         "                       `--access-token-path`. When `anon`, the anonymous\n"
                         "                       authentication scheme is attempted. When `user`, the\n"
                         "                       username and password scheme is attempted. In this case,\n"
                         "                       the username and password can be specified using\n"
                         "                       `--username` and `--password`.\n"
                         "  -A, --access-token   Specify the access token to be passed to the server when\n"
                         "                       authetication method is `none` (see `--auth-method`). If\n"
                         "                       no access token is specified, the client will use a hard\n"
                         "                       coded default access token, which has administrator\n"
                         "                       rights and is signed with the default private key.\n"
                         "  -P, --access-token-path  Specify the file containing the access token to be\n"
                         "                       passed to the server (see also `--access-token`).\n"
                         "  -u, --username       Specify the username to be used when the authetication\n"
                         "                       method is `user` (see `--auth-method`). The specified\n"
                         "                       username is subject to parameter substitutions (`@N`\n"
                         "                       etc.).\n"
                         "  -p, --password       The password associated with the specified username\n"
                         "                       (`--username`).\n"
                         "  --request-base-path  Set the base path for constructing HTTP request paths.\n"
                         "                       The default is `/api/client/v2.0`.\n"
                         "  -n, --num-transacts  Set the number of transactions to perform per local\n"
                         "                       Realm file in the current phase of the testing schedule\n"
                         "                       (0 by default). Must be zero if dry run (`--dry-run`) is\n"
                         "                       enabled. Each transaction can add (or replace) blobs\n"
                         "                       (see `--num-blobs`) and send changeset propagation time\n"
                         "                       measurement requests (see `--send-ptime-requests`). See\n"
                         "                       `--next-phase` for an explanation of the structure of\n"
                         "                       the testing schedule.\n"
                         "  -t, --transact-period  Time in milliseconds between transactions performed in\n"
                         "                       the current phase of the testing schedule (default is\n"
                         "                       1'000). See also `--next-phase`.\n"
                         "  -m, --max-transact-period  Maximum time in milliseconds between transactions\n"
                         "                       performed in the current phase of the testing schedule\n"
                         "                       (default is 0). If larger than `--transact-period`, the\n"
                         "                       timeout will be selected randomly between\n"
                         "                       `--transact-period` and `--max-transact-period`.\n"
                         "  -B, --num-blobs      Set the number of blobs to be added (or replaced) in\n"
                         "                       each transaction of the current phase of the testing\n"
                         "                       schedule (1 by default). Can be zero. Each blob is an\n"
                         "                       object of class `Blob` (see `--ensure-blob-class`). The\n"
                         "                       values of the properties of the blob objects are\n"
                         "                       controlled by `--blob-size`, `--blob-label`,\n"
                         "                       `--blob-kind`, `--blob-level`, and `--max-blob-level`.\n"
                         "                       See also `--replace-blobs`. See `--next-phase` for an\n"
                         "                       explanation of the structure of the testing schedule.\n"
                         "  -s, --blob-size      Set the size of the blobs that are added during each\n"
                         "                       transaction (0 by default). The individual byte values\n"
                         "                       will be randomized. The specified value applies to the\n"
                         "                       current phase of the testing schedule (see\n"
                         "                       `--next-phase`).\n"
                         "  --blob-label         The string value to be used for the `label` property of\n"
                         "                       new blobs (objects of class `Blob`). The default value\n"
                         "                       is the empty string. The specified value applies to the\n"
                         "                       current phase of the testing schedule (see\n"
                         "                       `--next-phase`).\n"
                         "  --blob-kind          The integer value to be used for the `kind` property of\n"
                         "                       new blobs (objects of class `Blob`). The default value\n"
                         "                       is zero. The specified value applies to the current\n"
                         "                       phase of the testing schedule (see `--next-phase`).\n"
                         "  -z, --blob-level     The integer value for the `level` property of new blobs\n"
                         "                       (objects of class `Blob`) will be drawn randomly from a\n"
                         "                       rectangular distribution between `--blob-level` and\n"
                         "                       `--max-blob-level` if the latter is greater than, or\n"
                         "                       equal to the former, otherwise the value of that\n"
                         "                       property will be what is specified by `--blob-level`.\n"
                         "                       The default value for this option is zero. The specified\n"
                         "                       value applies to the current phase of the testing\n"
                         "                       schedule (see `--next-phase`).\n"
                         "  -Z, --max-blob-level  See `--blob-level`. The default value for this option\n"
                         "                       is zero. The specified value applies to the current\n"
                         "                       phase of the testing schedule (see `--next-phase`).\n"
                         "  -a, --replace-blobs  Replace existing blobs instead of always adding new\n"
                         "                       ones. New ones will be added until the total preexisting\n"
                         "                       number is greater than, or equal to `--num-blobs`. This\n"
                         "                       option applies to the current phase of the testing\n"
                         "                       schedule (see `--next-phase`).\n"
                         "  --next-phase         Introduce a new phase into the testing schedule. The\n"
                         "                       testing schedule has one phase initially, and each\n"
                         "                       occurrence of `--next-phase` adds a new phase to the\n"
                         "                       schedule. Each phase has zero or more transactions\n"
                         "                       (`--num-transacts`) spaced out in time\n"
                         "                       (`--transact-period`). The testing schedule runs\n"
                         "                       independently for each peer. Options `--num-transacts`,\n"
                         "                       `--transact-period`, `--max-transact-period`,\n"
                         "                       `--num-blobs`, `--blob-size`, `--blob-label`,\n"
                         "                       `--blob-kind`, `--blob-level`, `--max-blob-level`,\n"
                         "                       `--replace-blobs`, and `--send-ptime-requests` apply to\n"
                         "                       the last introduced phase at that point on the command\n"
                         "                       line. When a new phase is introuced, its paramters are\n"
                         "                       copied from the previous phase, but the parameters can\n"
                         "                       then be altered with the mentioned options.\n"
                         "  -S, --send-ptime-requests  Send a changeset propagation time measurement\n"
                         "                       requests in each transaction (see `--num-transacts`).\n"
                         "                       Cannot be combined with with `--receive-ptime-requests`.\n"
                         "                       Each request consists of a new object of class\n"
                         "                       `PropagationTime` (see `--ensure-ptime-class`). See also\n"
                         "                       `--receive-ptime-requests`. This option applies to the\n"
                         "                       current phase of the testing schedule (see\n"
                         "                       `--next-phase`).\n"
                         "  -R, --receive-ptime-requests  Listen for changeset propagation time\n"
                         "                       measurement requests. Cannot be combined with\n"
                         "                       `--send-ptime-requests`, nor with `--dry-run`.\n"
                         "  -F, --ptime-request-threshold  Specifies which incoming changeset propagation\n"
                         "                       time measurement requests are to be ignored. The value\n"
                         "                       can be either `start` or `reconnect`. If it is `start`\n"
                         "                       (the default), then incoming requests are ignored if\n"
                         "                       they were initiated before the peer was started. If it\n"
                         "                       is `reconnect`, then incoming requests are ignored if\n"
                         "                       they were initiated before the connection to the server\n"
                         "                       was last established.\n"
                         "  -X, --originator-ident  The originator identifier of changeset\n"
                         "                       propagation time measurement requests (default is 0).\n"
                         "                       Used when seding such reqeusts. When recieving such\n"
                         "                       requests, those that carry a different identifier\n"
                         "                       will be ignored. To avoid relying on perfect\n"
                         "                       cross-host time synchronisation, set this identifier\n"
                         "                       to different values on different hosts.\n"
                         "  -y, --ensure-ptime-class  If it does not already exist, create the class\n"
                         "                       `PropagationTime` with properties `originator` and\n"
                         "                       `timestamp`, both of type `integer` for the purpose of\n"
                         "                       changeset propagation time measurements (see\n"
                         "                       `--send-ptime-requests` and `--receive-ptime-requests`).\n"
                         "                       The class will be created before any queries are added.\n"
                         "  -Y, --ensure-blob-class  If it does not already exist, create the class\n"
                         "                       `Blob` with properties `blob`, `label`, `kind`, and\n"
                         "                       `level` of types `binary`, `string`, `integer`, and\n"
                         "                       `integer` respectively. The class will be created before\n"
                         "                       any queries are added.\n"
                         "  -w, --download-first  Wait for download completion before initiating the\n"
                         "                       testing process.\n"
                         "  -W, --num-download-waits  Set the number of times to repeat the wait for\n"
                         "                       download completion on each peer, that is, after the\n"
                         "                       initiation of the testing process. The default number\n"
                         "                       is 1.\n"
                         "  -i, --interactive    Enter into interactive mode after upload and download\n"
                         "                       completion. In this mode, commands are read from STDIN.\n"
                         "                       Type `help` to see the list of available commands.\n"
                         "  -f, --follow         Keep the session(s) open after upload and download\n"
                         "                       completion, or after exit from interactive mode if\n"
                         "                       `--interactive` was specified.\n"
                         "  -N, --num-peers      Set the number of local Realm files to be\n"
                         "                       synchronized against the specified server-side Realm\n"
                         "                       (1 by default). When this is more than one, PATH must\n"
                         "                       contain the substitution parameter `@N`.\n"
                         "  -g, --num-growths    The number of steps to use for growing the number of\n"
                         "                       peers from zero to whatever is specified by\n"
                         "                       `--num-peers`. The default number of steps is 1.\n"
                         "  -G, --time-between-growths  The time in milliseconds between growth steps\n"
                         "                       (see `--num-growths`). The default value is 1'000.\n"
                         "  -C, --query-class    Set the name of the class to be targeted by subsequent\n"
                         "                       `--ensure-query-class`, `--generate-queryable`, and\n"
                         "                       `--add-query` options. The default name is `Queryable`.\n"
                         "  -e, --ensure-query-class  Create the class specified by `--query-class` if it\n"
                         "                       does not already exist. Then, add properties `level` and\n"
                         "                       `text` with types `integer` and `string` if they do not\n"
                         "                       already exist. Several tables can be created by\n"
                         "                       including this option multiple times.\n"
                         "  -j, --queryable-level  The integer value for the `level` property of objects\n"
                         "                       generated by subsequent `--generate-queryable` options\n"
                         "                       will be drawn randomly from a rectangular distribution\n"
                         "                       between `--queryable-level` and `--max-queryable-level`\n"
                         "                       if the latter is greater than, or equal to the former,\n"
                         "                       otherwise the value of that property will be what is\n"
                         "                       specified by `--queryable-level`. The default value for\n"
                         "                       this option is zero.\n"
                         "  -J, --max-queryable-level  See `--queryable-level`. The default value for\n"
                         "                       this option is zero.\n"
                         "  -b, --queryable-text  The value of the `text` property to use for\n"
                         "                       subsequent `--generate-queryable` options. The default\n"
                         "                       value is `Foo`.\n"
                         "  -Q, --generate-queryable  Add the specified number of objects to the class\n"
                         "                       specified by `--query-class`, which will be assumed to\n"
                         "                       have the shema created by `--ensure-query-class`.\n"
                         "                       Property values are specifed using `--queryable-level`,\n"
                         "                       `--max-queryable-level`, and `--queryable-text`. Each\n"
                         "                       occurrence of this option will add objects separately.\n"
                         "  -q, --add-query      Add another subscription query. The class name is\n"
                         "                       specified using `--query-class`. Each occurrence of this\n"
                         "                       option will add a separate query. If combined with\n"
                         "                       `--download-first`, the queries will be added after\n"
                         "                       everything is downloaded from the server. The specified\n"
                         "                       query string is subject to parameter substitutions (`@N`\n"
                         "                       etc.).\n"
                         "  -r, --dump-result-sets  Dump the result sets of all the previously submitted\n"
                         "                       subscription queries.\n"
                         "  --start-delay        Time in milliseconds to delay the start of the testing\n"
                         "                       process (default is 0).\n"
                         "  --max-start-delay    Maximum time in milliseconds to delay the start of the\n"
                         "                       testing process (default is 0). If larger than\n"
                         "                       `--start-delay`, the delay will be selected randomly\n"
                         "                       between `--start-delay` and `--max-start-delay`\n"
                         "                       (rectangular distribution).\n"
                         "  -c, --connection-per-session  Establish a separate network connection per\n"
                         "                       sync session. Note that there is one sync session\n"
                         "                       per local Realm file to be synchronized\n"
                         "                       (`--num-peers`).\n"
                         "  -d, --disable-sync-to-disk  Disable sync to disk (msync(), fsync()).\n"
                         "  -D, --dry-run        Do not access the local file system. Sessions will\n"
                         "                       act as if initiated on behalf of an empty (or\n"
                         "                       nonexisting) local Realm file. Received DOWNLOAD\n"
                         "                       messages will be accepted, but otherwise ignored.\n"
                         "                       No UPLOAD messages will be generated. Requires that\n"
                         "                       the number of transactions (`--num-transacts`) is\n"
                         "                       zero. Cannot be combined with\n"
                         "                       `--receive-ptime-requests`.\n"
                         "  -I, --time-between-pings  Time in milliseconds beteeen PING messages\n"
                         "                       (heartbeat). The default value is 60'000 (10 minutes).\n"
                         "  -O, --pong-timeout   Maximum time in milliseconds to allow for a PONG message\n"
                         "                       to be received after the corresponding PING message was\n"
                         "                       sent. The default value is 120'000 (2 minutes).\n"
                         "  -U, --connect-timeout  Maximum time in milliseconds to allow for a connection\n"
                         "                       to become fully established. This includes the time to\n"
                         "                       resolve the network address, the TCP connect operation,\n"
                         "                       the SSL handshake, and the WebSocket handshake. The\n"
                         "                       default is 120'000 (2 minutes).\n"
                         "  -L, --connection-linger-time  Time in milliseconds to keep a connection\n"
                         "                       open after all sessions have been abandoned or\n"
                         "                       suspended by errors. The default is 30'000 (30 seconds).\n"
                         "  -o, --tcp-no-delay   Set the `TCP_NODELAY` option on all TCP/IP sockets.\n"
                         "                       This disables the Nagle algorithm.\n"
                         "  -V, --verify-ssl-cert  Verify the servers SSL certificate.\n"
                         "  -T, --ssl-trust-cert  Path of a trust certificate used for verification.\n"
                         "  -M, --metrics-prefix  Prefix for metrics labels (`realm` by default). The\n"
                         "                       effective prefix is what you specify plus a dot (`.`).\n"
                         "  --statsd-address     Host name of StatsD server (`localhost` by\n"
                         "                       default).\n"
                         "  --statsd-port        Port number of StatsD server (8125 by default).\n"
                         "  -v, --report-roundtrip-times\n"
                         "                       Report heartbeat roundtrip times to StatsD server.\n"
                         "  -H, --halt-on-crash  Execute the test client as a child process, and if that\n"
                         "                       child process crashes (exits with nonzero status or is\n"
                         "                       killed by a signal), make the parent sleep\n"
                         "                       indefinitely.\n"
                         "  -x, --allow-core-dump  If supported by the platform, set the maximum size of\n"
                         "                       core files to 'unlimited', thereby allowing for core\n"
                         "                       files to be created when the process is killed.\n"
                         "  --disable-upload-compaction\n"
                         "                       Disable compaction during upload.\n"
                         "  --use-trivial-cooker  Associate a trivial changeset cooker with each\n"
                         "                       synchronization session. This causes a cooked history to\n"
                         "                       be produced in the corresponding Realm files.\n";
            return EXIT_SUCCESS;
        }

        if (error) {
            std::cerr << "ERROR: Bad command line.\n"
                         "Try `"
                      << prog << " --help`\n";
            return EXIT_FAILURE;
        }

        realm_path = argv[0];
        if (argc > 1) {
            server_url = argv[1];
            have_server_url = true;
        }
    }

    if (access_token.empty()) {
        if (!access_token_path.empty()) {
            access_token = util::load_file_and_chomp(access_token_path);
        }
        else {
            access_token = g_signed_test_access_token;
        }
    }

    util::Optional<sync::Session::Config::ClientReset> client_reset_config;
    if (client_reset_metadata_dir != "") {
        sync::Session::Config::ClientReset client_reset_config_2;
        client_reset_config_2.metadata_dir = client_reset_metadata_dir;
        client_reset_config_2.recover_local_changes = !disable_client_reset_recover_local_changes;
        client_reset_config_2.require_recent_state_realm = !disable_client_reset_require_recent_state_realm;
        client_reset_config = client_reset_config_2;
    }

    if (halt_on_crash) {
#ifndef _WIN32
        pid_t child_pid = fork();
        if (child_pid == -1) {
            int err = errno;
            std::error_code ec = util::make_basic_system_error_code(err);
            std::cerr << "fork() failed with " << ec.value() << ": " << ec.message() << "\n";
            return EXIT_FAILURE;
        }
        if (child_pid != 0) {
            int status = 0;
            for (;;) {
                pid_t pid = wait(&status);
                if (pid == -1) {
                    int err = errno;
                    std::error_code ec = util::make_basic_system_error_code(err);
                    std::cerr << "wait() failed with " << ec.value() << ": " << ec.message() << "\n";
                    return EXIT_FAILURE;
                }
                REALM_ASSERT(pid == child_pid);
                if (WIFEXITED(status)) {
                    if (WEXITSTATUS(status) != 0) {
                        std::cerr << "Exit with nonzero status (" << WEXITSTATUS(status) << ")\n";
                        break;
                    }
                    return EXIT_SUCCESS;
                }
                if (WIFSIGNALED(status)) {
                    std::cerr << "Killed by signal: " << strsignal(WTERMSIG(status)) << "\n";
                    break;
                }
                // Stopped or continued -> wait for next status change
            }
            std::cerr << "Halted\n";
            for (;;)
                std::this_thread::sleep_for(std::chrono::hours(1));
        }
#else
        std::cerr << "'Halt on crash' feature is unavailable on this platform\n";
        return EXIT_FAILURE;
#endif // _WIN32
    }

    if (allow_core_dump && util::system_has_rlimit(util::Resource::core_dump_size))
        util::set_soft_rlimit(util::Resource::core_dump_size, -1);

    util::Thread::set_name("main");
    std::unique_ptr<util::Logger> root_logger;
    if (log_timestamps) {
        util::TimestampStderrLogger::Config config;
        config.precision = util::TimestampStderrLogger::Precision::milliseconds;
        config.format = "%FT%T";
        root_logger = std::make_unique<util::TimestampStderrLogger>(std::move(config));
    }
    else {
        root_logger = std::make_unique<util::StderrLogger>();
    }
    util::ThreadSafeLogger logger{*root_logger, log_level};
    logger.info("Test client started");

    Metrics metrics{metrics_prefix, statsd_address, statsd_port};

    std::mt19937_64 test_proc_random;
    util::seed_prng_nondeterministically(test_proc_random);

    Peer::Context* context_ptr = nullptr;

    ThresholdOverrideLogger sync_base_logger{logger};
    sync_base_logger.set_level_threshold(sync_log_level);
    util::PrefixLogger sync_logger{"Sync: ", sync_base_logger};
    sync::Client::Config config;
    config.user_agent_application_info = "TestClient/" REALM_VERSION_STRING;
    config.logger = &sync_logger;
    config.one_connection_per_session = connection_per_session;
    config.dry_run = dry_run;
    config.ping_keepalive_period = time_between_pings;
    config.pong_keepalive_timeout = pong_timeout;
    config.connect_timeout = connect_timeout;
    config.connection_linger_time = connection_linger_time;
    config.tcp_no_delay = tcp_no_delay;
    config.disable_sync_to_disk = disable_sync_to_disk;
    config.disable_upload_compaction = disable_upload_compaction;
    if (report_roundtrip_times) {
        config.roundtrip_time_handler = [&context_ptr](milliseconds_type time) {
            Peer::Context& context = *context_ptr;
            context.add_roundtrip_time(time);
        };
    }
    sync::Client client{config};

    sync::ProtocolEnvelope protocol_envelope;
    std::string server_address;
    util::network::Endpoint::port_type server_port = 0;
    if (have_server_url) {
        std::string path;
        bool good_url =
            (client.decompose_server_url(server_url, protocol_envelope, server_address, server_port, path) &&
             path == "/");
        if (!good_url)
            throw sync::BadServerUrl{};
    }

    bool auth_ssl = false;
    switch (protocol_envelope) {
        case sync::ProtocolEnvelope::realm:
        case sync::ProtocolEnvelope::ws:
            break;
        case sync::ProtocolEnvelope::realms:
        case sync::ProtocolEnvelope::wss:
            auth_ssl = true;
            break;
    }
    util::PrefixLogger auth_logger{"Auth: ", sync_base_logger};
    sync::auth::Client::Config auth_config;
    auth_config.logger = &auth_logger;
    auth_config.request_base_path = request_base_path;
    sync::auth::Client auth{auth_ssl, server_address, server_port, app_id, std::move(auth_config)};

    MainEventLoop main_event_loop{client, interactive};

    // Compatibility with mock server
    std::string sync_request_path = request_base_path;
    if (!app_id.empty()) {
        sync_request_path += "/app/";
        sync_request_path += app_id;
        sync_request_path += "/realm-sync";
    }

    util::Optional<std::string> ssl_trust_certificate_path =
        (ssl_trust_cert == "" ? util::none : util::Optional<std::string>(ssl_trust_cert));

    util::network::Service test_proc_service;

    auto on_sync_error = [&](bool is_fatal) {
        switch (abort_on_error) {
            case AbortOnError::fatal:
                if (is_fatal)
                    break;
                return;
            case AbortOnError::always:
                break;
            case AbortOnError::never:
                return;
        }
        main_event_loop.stop();
    };

    int max_peer_number_width;
    {
        std::ostringstream out;
        out << num_peers;
        max_peer_number_width = int(out.str().size());
    }

    // Compute peer-specific parameters
    struct PeerParams {
        std::string realm_path;
        std::string realm_name;
        std::string username;
        std::vector<std::string> queries;
    };
    std::string host_name = util::network::host_name();
    std::unique_ptr<PeerParams[]> peer_params = std::make_unique<PeerParams[]>(num_peers);
    {
        int peer_ndx = 0;

        using Substituter = util::Substituter<>;
        Substituter substituter;
        substituter["N"] = [&](std::ostream& out) {
            out.fill('0');
            out << std::setw(max_peer_number_width) << (1 + peer_ndx);
        };
        substituter["I"] = &peer_ndx;
        substituter["H"] = &host_name;

        Substituter::Template realm_path_template;
        if (!substituter.parse(realm_path, realm_path_template, logger))
            return EXIT_FAILURE;
        if (num_peers > 1) {
            bool good_template = (realm_path_template.refers_to("N") || realm_path_template.refers_to("I"));
            if (!good_template) {
                logger.error("Substitution parameter `N` or `I` must be used in Realm "
                             "path template '%1'",
                             realm_path);
                return EXIT_FAILURE;
            }
        }
        Substituter::Template realm_name_template;
        if (!substituter.parse(realm_name, realm_name_template, logger))
            return EXIT_FAILURE;
        Substituter::Template username_template;
        if (!substituter.parse(username, username_template, logger))
            return EXIT_FAILURE;
        std::vector<Substituter::Template> query_templates;
        for (const auto& entry : add_queries) {
            const std::string& query = entry.second;
            Substituter::Template templ;
            if (!substituter.parse(query, templ, logger))
                return EXIT_FAILURE;
            query_templates.emplace_back(std::move(templ));
        }
        for (int i = 0; i < num_peers; ++i) {
            peer_ndx = i;
            peer_params[i].realm_path = realm_path_template.expand();
            peer_params[i].realm_name = realm_name_template.expand();
            peer_params[i].username = username_template.expand();
            for (const auto& templ : query_templates)
                peer_params[i].queries.push_back(templ.expand());
        }
    }

    // Create peers
    bool reset_on_reconnect = (ptime_request_threshold == PropagationTimeThreshold::reconnect);
    Peer::Context context(client, auth, test_proc_service, test_proc_random, metrics, disable_sync_to_disk,
                          report_roundtrip_times, reset_on_reconnect);
    context_ptr = &context;
    std::unique_ptr<std::unique_ptr<util::Logger>[]> peer_logger_owners =
        std::make_unique<std::unique_ptr<util::Logger>[]>(num_peers);
    std::unique_ptr<util::Logger*[]> peer_loggers = std::make_unique<util::Logger*[]>(num_peers);
    std::unique_ptr<std::unique_ptr<Peer>[]> peers = std::make_unique<std::unique_ptr<Peer>[]>(num_peers);
    if (num_peers == 1) {
        peer_loggers[0] = &logger;
        peers[0] = std::make_unique<Peer>(context, sync_request_path, peer_params[0].realm_path, logger,
                                          originator_ident, verify_ssl_cert, ssl_trust_certificate_path,
                                          client_reset_config, use_trivial_cooker, on_sync_error);
    }
    else if (num_peers >= 2) {
        std::ostringstream out;
        out.imbue(std::locale::classic());
        out.fill('0');
        for (int i = 0; i < num_peers; ++i) {
            out.str(std::string{});
            out << "Peer[" << std::setw(max_peer_number_width) << (i + 1) << "]: ";
            std::string logger_prefix = out.str();
            peer_logger_owners[i] = std::make_unique<util::PrefixLogger>(std::move(logger_prefix), logger);
            peer_loggers[i] = peer_logger_owners[i].get();
            peers[i] = std::make_unique<Peer>(context, sync_request_path, peer_params[i].realm_path, *peer_loggers[i],
                                              originator_ident, verify_ssl_cert, ssl_trust_certificate_path,
                                              client_reset_config, use_trivial_cooker, on_sync_error);
        }
    }
    if (receive_ptime_requests) {
        for (int i = 0; i < num_peers; ++i)
            peers[i]->prepare_receive_ptime_requests();
    }

    if (start_delay > 0 || max_start_delay > 0) {
        milliseconds_type delay = make_distr(start_delay, max_start_delay)(test_proc_random);
        logger.info("Delaying start of testing process by %1 milliseconds", delay);
        std::this_thread::sleep_for(std::chrono::milliseconds(delay));
        logger.info("Testing process started");
    }
    metrics.increment("client.started");
    context.init_metrics_gauges(receive_ptime_requests);

    if (total_num_transacts > 0) {
        int phase_ndx = 0;
        for (const PhaseSpec& phase : phases) {
            if (phase.num_transacts > 0) {
                logger.info("Phase %1: num_transacts=%2, transact_period=%3, "
                            "max_transact_period=%4, send_ptime_requests=%5",
                            phase_ndx + 1, phase.num_transacts, phase.transact_period, phase.max_transact_period,
                            phase.send_ptime_requests);
            }
            ++phase_ndx;
        }
    }
    if (receive_ptime_requests)
        logger.info("Receiving changeset propagation time measurement requests");

    // Bootstrap peers
    if (num_growths > 1) {
        logger.info("Growing number of peers from 0 to %1 in %2 steps with %3 milliseconds "
                    "between steps",
                    num_peers, num_growths, time_between_growths);
    }
    std::unique_ptr<char[]> blob_owner;
    {
        std::size_t max_blob_size = 0;
        for (const PhaseSpec& phase : phases) {
            if (phase.num_transacts > 0 && phase.blob_size > max_blob_size)
                max_blob_size = phase.blob_size;
        }
        if (max_blob_size > 0) {
            std::uniform_int_distribution<unsigned short> distr;
            using uchar = unsigned char;
            blob_owner = std::make_unique<char[]>(max_blob_size);
            char* begin = &blob_owner[0];
            char* end = begin + max_blob_size;
            std::generate(begin, end, [&] {
                return char(uchar(distr(test_proc_random)));
            });
        }
    }

    std::unique_ptr<std::unique_ptr<PeerControl>[]> peer_controls =
        std::make_unique<std::unique_ptr<PeerControl>[]>(num_peers);
    for (int i = 0; i < num_peers; ++i)
        peer_controls[i] = std::make_unique<PeerControl>(test_proc_service);
    PeerControlFuncs peer_control_funcs;

    peer_control_funcs.initiate_bind = [&](int i) {
        if (have_server_url) {
            if (auth_method != AuthMethod::none) {
                peer_loggers[i]->detail("Logging in");
                auto handler = [i, &peer_control_funcs](std::error_code ec, std::string access_token,
                                                        std::string refresh_token) {
                    if (REALM_UNLIKELY(ec == util::error::operation_aborted))
                        return;
                    peer_control_funcs.logged_in(i, ec, std::move(access_token), std::move(refresh_token));
                };
                switch (auth_method) {
                    case AuthMethod::none:
                        REALM_ASSERT(false);
                        break;
                    case AuthMethod::anon:
                        auth.login_anon(handler);
                        break;
                    case AuthMethod::user:
                        auth.login_user(peer_params[i].username, password, handler);
                        break;
                }
            }
            else {
                std::string refresh_token; // Empty -> no refreshing
                peers[i]->bind(protocol_envelope, server_address, server_port, peer_params[i].realm_name,
                               access_token, refresh_token);
                peer_control_funcs.on_bound(i);
            }
        }
        else {
            peer_control_funcs.begin_schedule(i);
        }
    };

    // Executed by `auth_thread`
    peer_control_funcs.logged_in = [&](int i, std::error_code ec, std::string access_token,
                                       std::string refresh_token) {
        if (REALM_UNLIKELY(ec)) {
            peer_loggers[i]->fatal("Failed to log in: %1 (error_code=%2)", ec.message(), ec);
            bool is_fatal = true;
            on_sync_error(is_fatal);
            return;
        }
        auto func = [i, protocol_envelope, &server_address, server_port, &peers, &peer_params, &peer_control_funcs,
                     access_token = std::move(access_token), refresh_token = std::move(refresh_token)] {
            peers[i]->bind(protocol_envelope, server_address, server_port, peer_params[i].realm_name, access_token,
                           refresh_token);
            peer_control_funcs.on_bound(i);
        };
        test_proc_service.post(func);
    };

    peer_control_funcs.on_bound = [&](int i) {
        if (download_first) {
            auto handler = [i, &peer_control_funcs, &test_proc_service](std::error_code ec) {
                if (ec == util::error::operation_aborted)
                    return;
                REALM_ASSERT(!ec);
                auto func = [i, &peer_control_funcs] {
                    peer_control_funcs.begin_schedule(i);
                };
                test_proc_service.post(func);
            };
            sync::Session& session = peers[i]->get_session();
            session.async_wait_for_download_completion(handler);
        }
        else {
            peer_control_funcs.begin_schedule(i);
        }
    };

    peer_control_funcs.begin_schedule = [&](int i) {
        if (ensure_blob_class)
            peers[i]->ensure_blob_class();
        if (ensure_ptime_class)
            peers[i]->ensure_ptime_class();
        for (const std::string& class_name : ensure_query_classes)
            peers[i]->ensure_query_class(class_name);
        for (const auto& entry : generate_queryables) {
            const std::string& class_name = std::get<0>(entry);
            int n = std::get<1>(entry);
            int queryable_level = std::get<2>(entry);
            int max_queryable_level = std::get<3>(entry);
            const std::string& queryable_text = std::get<4>(entry);
            auto level_distr = make_distr(queryable_level, max_queryable_level);
            peers[i]->generate_queryable(class_name, n, level_distr, queryable_text);
        }
        std::size_t num_queries = add_queries.size();
        for (std::size_t j = 0; j < num_queries; ++j) {
            const std::string& class_name = add_queries[j].first;
            const std::string& query = peer_params[i].queries[j];
            peers[i]->add_query(class_name, query);
        }
        peer_control_funcs.launch_phase(i);
        if (receive_ptime_requests)
            peers[i]->enable_receive_ptime_requests();
    };

    int num_peers_at_transact_completion = 0;
    peer_control_funcs.launch_phase = [&](int i) {
        REALM_ASSERT(peer_controls[i]->phase_ndx <= int(phases.size()));
        while (peer_controls[i]->phase_ndx < int(phases.size())) {
            const PhaseSpec& phase = phases[peer_controls[i]->phase_ndx];
            if (phase.num_transacts > 0) {
                peer_control_funcs.sched_perform_transaction(i);
                return;
            }
            ++peer_controls[i]->phase_ndx;
        }
        if (++num_peers_at_transact_completion == num_peers)
            logger.info("Transactions completed for all peers");
        peer_control_funcs.check_end_of_schedule(i);
    };

    peer_control_funcs.sched_perform_transaction = [&](int i) {
        REALM_ASSERT(peer_controls[i]->phase_ndx < int(phases.size()));
        const PhaseSpec& phase = phases[peer_controls[i]->phase_ndx];
        REALM_ASSERT(peer_controls[i]->transact_ndx < phase.num_transacts);
        auto handler = [i, &peer_control_funcs](std::error_code ec) {
            if (ec == util::error::operation_aborted)
                return;
            REALM_ASSERT(!ec);
            peer_control_funcs.perform_transaction(i);
        };
        auto transact_period_distr = make_distr(phase.transact_period, phase.max_transact_period);
        milliseconds_type period = transact_period_distr(test_proc_random);
        peer_controls[i]->transact_timer.async_wait(std::chrono::milliseconds(period), handler);
    };

    peer_control_funcs.perform_transaction = [&](int i) {
        REALM_ASSERT(peer_controls[i]->phase_ndx < int(phases.size()));
        const PhaseSpec& phase = phases[peer_controls[i]->phase_ndx];
        peer_loggers[i]->detail("Performing transaction (%1/%2) of phase (%3/%4)", peer_controls[i]->transact_ndx + 1,
                                phase.num_transacts, peer_controls[i]->phase_ndx + 1, phases.size());
        BinaryData blob{&blob_owner[0], phase.blob_size};
        Peer::TransactSpec transact;
        transact.num_blobs = phase.num_blobs;
        transact.blob_label = phase.blob_label;
        transact.blob_kind = phase.blob_kind;
        transact.blob_level_distr = make_distr(phase.blob_level, phase.max_blob_level);
        transact.replace_blobs = phase.replace_blobs;
        transact.send_ptime_request = phase.send_ptime_requests;
        peers[i]->perform_transaction(blob, transact);
        if (++peer_controls[i]->transact_ndx < phase.num_transacts) {
            peer_control_funcs.sched_perform_transaction(i);
            return;
        }
        peer_controls[i]->transact_ndx = 0;
        ++peer_controls[i]->phase_ndx;
        peer_control_funcs.launch_phase(i);
    };

    peer_control_funcs.check_end_of_schedule = [&](int i) {
        if (peer_controls[i]->phase_ndx < int(phases.size()))
            return;
        peer_control_funcs.on_end_of_schedule(i);
    };

    int num_peers_at_end_of_schedule = 0;
    peer_control_funcs.on_end_of_schedule = [&](int i) {
        ++num_peers_at_end_of_schedule;
        bool all_at_end_of_schedule = (num_peers_at_end_of_schedule == num_peers);
        if (all_at_end_of_schedule)
            logger.info("End of schedule for all peers");
        if (have_server_url) {
            peer_control_funcs.initiate_wait_for_upload_completion(i);
        }
        else if (all_at_end_of_schedule) {
            main_event_loop.end_of_test_proc();
        }
    };

    peer_control_funcs.initiate_wait_for_upload_completion = [&](int i) {
        auto handler = [i, &peer_control_funcs, &test_proc_service](std::error_code ec) {
            if (ec == util::error::operation_aborted)
                return;
            REALM_ASSERT(!ec);
            auto func = [i, &peer_control_funcs] {
                peer_control_funcs.on_upload_completion(i);
            };
            test_proc_service.post(func);
        };
        sync::Session& session = peers[i]->get_session();
        session.async_wait_for_upload_completion(handler);
    };

    int num_peers_at_upload_completion = 0;
    peer_control_funcs.on_upload_completion = [&](int i) {
        if (num_peers > 1)
            peer_loggers[i]->detail("Upload complete");
        REALM_ASSERT(num_peers > 0);
        if (++num_peers_at_upload_completion < num_peers)
            return;
        peer_control_funcs.on_upload_completion_for_all();
    };

    peer_control_funcs.on_upload_completion_for_all = [&] {
        logger.info("Upload complete for all peers");
        for (int i = 0; i < num_peers; ++i)
            peer_control_funcs.initiate_wait_for_download_completion(i);
    };

    peer_control_funcs.initiate_wait_for_download_completion = [&](int i) {
        auto handler = [i, &peer_control_funcs, &test_proc_service](std::error_code ec) {
            if (ec == util::error::operation_aborted)
                return;
            REALM_ASSERT(!ec);
            auto func = [i, &peer_control_funcs] {
                peer_control_funcs.on_download_completion(i);
            };
            test_proc_service.post(func);
        };
        sync::Session& session = peers[i]->get_session();
        session.async_wait_for_download_completion(handler);
    };

    int num_peers_at_download_completion = 0;
    peer_control_funcs.on_download_completion = [&](int i) {
        if (++peer_controls[i]->download_wait_ndx < num_download_waits) {
            peer_control_funcs.initiate_wait_for_download_completion(i);
            return;
        }
        if (num_peers > 1)
            peer_loggers[i]->detail("Download complete");
        if (dump_result_sets)
            peers[i]->dump_result_sets();
        REALM_ASSERT(num_peers > 0);
        if (++num_peers_at_download_completion < num_peers)
            return;
        peer_control_funcs.on_download_completion_for_all();
    };

    peer_control_funcs.on_download_completion_for_all = [&] {
        if (have_server_url)
            logger.info("Download complete for all peers");
        if (follow) {
            logger.info("Tracking further changes from the server...");
        }
        else {
            main_event_loop.end_of_test_proc();
        }
    };

    util::network::DeadlineTimer growth_timer{test_proc_service};
    int current_num_peers = 0;
    int growth_ndx = 0;
    std::function<void()> grow = [&] {
        REALM_ASSERT(num_growths > 0);
        int new_num_peers = int(std::round(double(growth_ndx + 1) / num_growths * num_peers));
        if (num_growths > 1) {
            logger.detail("Growing number of peers to %1 (step %2/%3)", new_num_peers, growth_ndx + 1, num_growths);
        }
        for (int i = current_num_peers; i < new_num_peers; ++i) {
            peer_control_funcs.initiate_bind(i);
        }
        current_num_peers = new_num_peers;
        if (++growth_ndx == num_growths) {
            if (num_growths > 1)
                logger.info("Done growing number of peers");
            return;
        }
        auto handler = [&](std::error_code ec) {
            if (ec == util::error::operation_aborted)
                return;
            grow();
        };
        growth_timer.async_wait(std::chrono::milliseconds(time_between_growths), std::move(handler));
    };
    {
        auto func = [&] {
            if (num_peers < 1) {
                peer_control_funcs.on_download_completion_for_all();
            }
            else {
                grow();
            }
        };
        test_proc_service.post(func);
    }

    util::network::DeadlineTimer keep_alive_timer{test_proc_service};
    std::function<void()> sched_keep_alive_wait;
    sched_keep_alive_wait = [&] {
        auto handler = [&](std::error_code ec) {
            if (ec == util::error::operation_aborted)
                return;
            REALM_ASSERT(!ec);
            sched_keep_alive_wait();
        };
        keep_alive_timer.async_wait(std::chrono::hours(1), handler);
    };
    {
        auto func = [&] {
            sched_keep_alive_wait();
        };
        test_proc_service.post(func);
    }

    auto sync_thread = util::make_thread_exec_guard(client, main_event_loop);
    sync_thread.start_with_signals_blocked("sync");

    auto auth_thread = util::make_thread_exec_guard(auth, main_event_loop);
    if (have_server_url && auth_method != AuthMethod::none)
        auth_thread.start_with_signals_blocked("auth");

    auto test_proc_thread = util::make_thread_exec_guard(test_proc_service, main_event_loop);
    test_proc_thread.start_with_signals_blocked("test_proc");

    main_event_loop.run();

    sync_thread.stop_and_rethrow();
    auth_thread.stop_and_rethrow();
    test_proc_thread.stop_and_rethrow();
}
